package io.xxx.eva.task.select;

import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import io.xxx.eva.receiver.Room;
import io.xxx.eva.receiver.RoomRepository;
import io.xxx.eva.task.Task;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
public class FileReceiverSelector implements ReceiverSelector {

    private final RestTemplate restTemplate;

    private final RoomRepository roomRepository;

    public FileReceiverSelector(RoomRepository roomRepository, RestTemplateBuilder restTemplateBuilder) {
        this.restTemplate = restTemplateBuilder.build();
        this.roomRepository = roomRepository;
    }

    @Override
    public Flux<Room> select(Task task) {
        return Flux.create(emitter -> {
            int cacheSize = 100;
            List<String> roomIds = new ArrayList<>(cacheSize);
            try {
                InputStream is = getInputStream(task);
                EasyExcel.read(is, new AnalysisEventListener<Map<Integer, String>>() {
                            @Override
                            public void invoke(Map<Integer, String> data, AnalysisContext context) {
                                roomIds.add(data.get(0));
                                if (roomIds.size() == cacheSize) {
                                    List<Room> rooms = roomRepository.findAllById(roomIds);
                                    for (Room room : rooms) {
                                        emitter.next(room);
                                    }
                                    roomIds.clear();
                                }
                            }

                            @Override
                            public void doAfterAllAnalysed(AnalysisContext context) {
                                if (!roomIds.isEmpty()) {
                                    List<Room> rooms = roomRepository.findAllById(roomIds);
                                    for (Room room : rooms) {
                                        emitter.next(room);
                                    }
                                    roomIds.clear();
                                }
                            }
                        }
                );
                emitter.complete();
            } catch (IOException e) {
                emitter.error(e);
            }
        });
    }

    private InputStream getInputStream(Task task) throws IOException {
        String url = task.getSelectContent().get("url");
        Resource resource = restTemplate.getForObject(url, Resource.class);
        if (resource != null) {
            return resource.getInputStream();
        }
        return null;
    }
}
